Skip to content

connection: clean up failed heartbeat sends#876

Open
dkropachev wants to merge 1 commit intoscylladb:masterfrom
dkropachev:dk/heartbeat-send-failure-cleanup
Open

connection: clean up failed heartbeat sends#876
dkropachev wants to merge 1 commit intoscylladb:masterfrom
dkropachev:dk/heartbeat-send-failure-cleanup

Conversation

@dkropachev
Copy link
Copy Markdown
Collaborator

@dkropachev dkropachev commented May 6, 2026

Fixes #875.

Heartbeat sends can fail after a request id has already been reserved. This change keeps the request-id pool and in-flight accounting consistent across that failure path, and avoids double-releasing the slot on the control-connection branch.

Changes:

  • preserve heartbeat cleanup when send_msg() fails
  • only release the in-flight slot directly for control connections
  • add a regression test for a failed heartbeat send

@dkropachev dkropachev force-pushed the dk/heartbeat-send-failure-cleanup branch from 892831d to 358e016 Compare May 7, 2026 06:50
@dkropachev dkropachev self-assigned this May 7, 2026
@dkropachev dkropachev marked this pull request as ready for review May 7, 2026 10:15
@sylwiaszunejko sylwiaszunejko requested a review from Copilot May 7, 2026 11:12
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a heartbeat failure edge case where send_msg() can fail after a request id has been reserved, leaking the request id / callback registration and (for control connections) potentially leaving in_flight incorrectly incremented. It also adds a regression test to ensure request-id and in-flight bookkeeping is restored after a failed heartbeat send.

Changes:

  • Update HeartbeatFuture to unwind _requests registration and return the reserved request id when send_msg() fails.
  • Ensure in_flight is released directly only for control connections (since the control-connection owner’s return_connection() doesn’t decrement in_flight).
  • Add a unit test covering failed heartbeat send cleanup.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
cassandra/connection.py Adds failure-path cleanup in HeartbeatFuture for send_msg() exceptions (request id + _requests unwind; in_flight handling for control connections).
tests/unit/test_connection.py Adds regression test ensuring request id pool, _requests, and in_flight are consistent after a heartbeat send failure.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread cassandra/connection.py
Comment on lines +1819 to 1828
request_id = connection.get_request_id()
try:
connection.send_msg(OptionsMessage(), request_id, self._options_callback)
except Exception as exc:
connection.in_flight -= 1
if request_id not in connection._requests and request_id not in connection.request_ids:
connection.request_ids.append(request_id)
self._exception = exc
self._event.set()
else:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment for the first commit. Assumption I have in the comment: if self.push(msg) fails then connection is broken.

There is one more edge case: if we wail after self._requests[request_id] = (cb, decoder, result_metadata), but before self.push(msg) then _requests will have the new request, but it won't be sent, effectively making it orphaned, without being accounted as such.
Can we fix that? It would require having a cleanup path for request_id in connection._requests, which I'm afraid is a bit too dangerous.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, fixed

Comment thread cassandra/connection.py
Comment on lines +1823 to +1828
if connection.is_control_connection:
connection.in_flight -= 1
# send_msg() registers the callback before writing to the socket,
# so a write failure must unwind that registration here.
connection._requests.pop(request_id, None)
if request_id not in connection.request_ids:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I just don't understand the second commit. Why do you treat CC differently here? Why do we care about socket write error - won't it result in connection being closed anyway?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC is special because ControlConnection.return_connection() does not decrement in_flight, while HostConnection.return_connection() does. The write failure still matters because send_msg() has already registered the callback and reserved the request id before push(), so that bookkeeping has to be unwound explicitly.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mention return_connection but I don't see where it is called :(

Copy link
Copy Markdown
Collaborator Author

@dkropachev dkropachev May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant the later owner.return_connection(connection) in ConnectionHeartbeat.run:

def run(self):
self._shutdown_event.wait(self._interval)
while not self._shutdown_event.is_set():
start_time = time.time()
futures = []
failed_connections = []
try:
for connections, owner in [(o.get_connections(), o) for o in self._get_connection_holders()]:
for connection in connections:
self._raise_if_stopped()
if not (connection.is_defunct or connection.is_closed):
if connection.is_idle:
try:
futures.append(HeartbeatFuture(connection, owner))
except Exception as e:
log.warning("Failed sending heartbeat message on connection (%s) to %s",
id(connection), connection.endpoint)
failed_connections.append((connection, owner, e))
else:
connection.reset_idle()
else:
log.debug("Cannot send heartbeat message on connection (%s) to %s",
id(connection), connection.endpoint)
# make sure the owner sees this defunt/closed connection
owner.return_connection(connection)
self._raise_if_stopped()
# Wait max `self._timeout` seconds for all HeartbeatFutures to complete
timeout = self._timeout
start_time = time.time()
for f in futures:
self._raise_if_stopped()
connection = f.connection
try:
f.wait(timeout)
# TODO: move this, along with connection locks in pool, down into Connection
with connection.lock:
connection.in_flight -= 1
connection.reset_idle()
except Exception as e:
log.warning("Heartbeat failed for connection (%s) to %s",
id(connection), connection.endpoint)
failed_connections.append((f.connection, f.owner, e))
timeout = self._timeout - (time.time() - start_time)
for connection, owner, exc in failed_connections:
self._raise_if_stopped()
if not connection.is_control_connection:
# Only HostConnection supports shutdown_on_error
owner.shutdown_on_error = True
connection.defunct(exc)
owner.return_connection(connection)

for connection, owner, exc in failed_connections:
self._raise_if_stopped()
if not connection.is_control_connection:
# Only HostConnection supports shutdown_on_error
owner.shutdown_on_error = True
connection.defunct(exc)
owner.return_connection(connection)

That block only unwinds the callback/request-id registration that send_msg() already did:

def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=None):
if self.is_defunct:
msg = "Connection to %s is defunct" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
raise ConnectionShutdown(msg)
elif self.is_closed:
msg = "Connection to %s is closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
raise ConnectionShutdown(msg)
elif not self._socket_writable:
raise ConnectionBusy("Connection %s is overloaded" % self.endpoint)
# queue the decoder function with the request
# this allows us to inject custom functions per request to encode, decode messages
self._requests[request_id] = (cb, decoder, result_metadata)
msg = encoder(msg, request_id, self.protocol_version, compressor=self.compressor,
allow_beta_protocol_version=self.allow_beta_protocol_version)
if self._is_checksumming_enabled:
buffer = io.BytesIO()
self._segment_codec.encode(buffer, msg)
msg = buffer.getvalue()
self.push(msg)
return len(msg)

For control connections, ControlConnection.return_connection() does not decrement in_flight

def return_connection(self, connection):
if connection is self._connection and (connection.is_defunct or connection.is_closed):
self.reconnect()

while HostConnection.return_connection() does, so the direct decrement has to stay here. It can’t be handled in ControlConnection.return_connection() because that method only sees a defunct/closed connection at the end of the heartbeat cycle

def return_connection(self, connection, stream_was_orphaned=False):
if not stream_was_orphaned:
with connection.lock:
connection.in_flight -= 1
with self._stream_available_condition:
self._stream_available_condition.notify()

It does not know which request_id was reserved, and it does not have the context that send_msg() already registered the callback in _requests. The leak happens in the send_msg() failure path, while we still have the request_id and can unwind _requests / request_ids immediately.`

Keep heartbeat request-id and in-flight bookkeeping consistent when send_msg() fails.\n\nHandle the control-connection in_flight release separately from HostConnection cleanup.
@dkropachev dkropachev force-pushed the dk/heartbeat-send-failure-cleanup branch from 358e016 to 6ad10b4 Compare May 7, 2026 13:55
@dkropachev dkropachev requested a review from Lorak-mmk May 7, 2026 14:24
@Lorak-mmk
Copy link
Copy Markdown

is this the right way to fix it?
On normal (non-exception path) the in_flight and other stuff is released somewhere by the connection (process_msg? Or some other method, I don't know). I think so because I see no other place in HeartbeatFuture where it is handled.
So perhaps it should be the same for the exception path? In other words, maybe send_msg should handle it?

@dkropachev
Copy link
Copy Markdown
Collaborator Author

is this the right way to fix it? On normal (non-exception path) the in_flight and other stuff is released somewhere by the connection (process_msg? Or some other method, I don't know). I think so because I see no other place in HeartbeatFuture where it is handled. So perhaps it should be the same for the exception path? In other words, maybe send_msg should handle it?

Can't agree more, there has to be an infrastructure that would handle request scheduling, executing, failing in respect of borrowing/returning request_id, tracking in_flight, etc.
It should be something like set of Future classes and proper API on connection to handle all the cases.
Unfortunately it is not like that and now in_flight, request_ids are messed around all over the code.
IMHO we should fix obvoius problems and then refactor whole area to make it sane.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

connection: clean up failed heartbeat sends

3 participants